package profiterole.waffle;

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;

import profiterole.api.Waffle;
import profiterole.mapreduce.MapReduceService;
import profiterole.mapreduce.Splitter;

public class WaffleImpl<T extends Comparable<? super T>> implements Waffle<T>, Serializable {

	private static final long serialVersionUID = 5544962900791276368L;
	private long timeStamp = 0;
	private HashMap<String, T> map;

	public WaffleImpl(HashMap<String, T> map) {

		if (map == null) {
			this.map = new HashMap<String, T>();
		} else {
			this.map = map;
		}

		timeStamp = System.currentTimeMillis();
	}

	@Override
	public boolean containsKey(String key) {
		return map.containsKey(key);
	}

	@Override
	public boolean containsValue(String value) {
		return map.containsValue(value);
	}

	@Override
	public long getTimeStamp() {
		return timeStamp;
	}

	@Override
	public T get(String word) {	
		return map.get(word);
	}

	@Override
	public String toString() {
		return getSortedUnModifiableList().toString();
	}
	
	@Override
	public List<Entry<String, T >> getSortedUnModifiableList() {
		List<Entry<String, T>> list = new ArrayList< Entry<String, T>>(map.entrySet());
		Collections.sort(list, new Comparator<Entry<String, T >>() {

			@Override
			public int compare(Entry<String, T> o1, Entry<String, T> o2) {
				
				return o1.getKey().compareTo(o2.getKey());
			}
		});
		
		return Collections.unmodifiableList(list);
	}
	
	@Override
	@SuppressWarnings("unchecked")
	public Waffle<T> update(String folder, ReduceStrategy<T> strategy) {
		
		WaffleImpl<T> other = 
				(WaffleImpl<T>) MapReduceService.mapReduce(Splitter.folderToFileList(new File(folder))).getSortedUnModifiableList();
		
		return update(other, strategy); 
	}
	
	@Override
	public Waffle<T> update(Waffle<T> other, ReduceStrategy<T> strategy) {
	
		WaffleImpl<T> other1 = (WaffleImpl<T>)(other);
		
		for (Entry<String, T> entry : other1.map.entrySet()) {
            String key = entry.getKey();
            
            if(this.containsKey(key)) {         	
            	this.map.put(entry.getKey(), strategy.apply(this.map.get(key), entry.getValue()));
            }
            else {	
            	this.map.put(entry.getKey(), (T)entry.getValue());
            }
        }
		
		return this;
		
	}
	
	static <E> E reduce(List<E> list, ReduceStrategy<E> f, E initVal) {
		List<E> snapshot;
		synchronized (list) {
			snapshot = new ArrayList<E>(list);
		}
		E result = initVal;
		for (E e : snapshot)
			result = f.apply(result, e);
		return result;
	}

	
	public static class DirtInt<M> implements ReduceStrategy<M> {

		@SuppressWarnings("unchecked")
		@Override
		public M apply(M arg1, M arg2) {
			return (M)new Integer(((Integer)(arg1)).intValue() +
					((Integer)(arg2)).intValue());
		}
		
	}
	
	public static class DirtString<M> implements ReduceStrategy<M> {

		@SuppressWarnings("unchecked")
		@Override
		public M apply(M arg1, M arg2) {
			StringBuilder sb = new StringBuilder(arg1.toString());
			sb.append("#").append(arg2.toString());
			
			return (M)sb.toString();
		}
		
	}
	
	/*
	@SuppressWarnings("unchecked")
	private T f(Object value, Object value2) {
		
		if(value instanceof String) {
			return (T)fDirt((String)value, (String)value2);
		} 
		else if(value instanceof Integer) {
			return (T)fDirt((Integer)value, (Integer)value2);
		}
		
		return (T)value ;
	}*/

	@SuppressWarnings("unused")
	// called with generic mechanism
	private String fDirt(String value, String value2) {
		
		StringBuilder sb = new StringBuilder(value);
		sb.append("#").append(value2);
		
		return sb.toString();
	}
	
	public static void testString() {
		HashMap<String, String> map = new HashMap<String, String>();

		map.put("Value1", "1");
		map.put("Value2", "2");
		map.put("Value3", "3");
		map.put("Value4", "4");
		map.put("Value5", "5");

		WaffleImpl<String> waffle1 = new WaffleImpl<String>(map);
	
		HashMap<String, String> map1 = new HashMap<String, String>();
		map1.put("Value1", "1");
		map1.put("Value2", "2");
		map1.put("Value3", "3");
		map1.put("Value4", "4");
		map1.put("Value5", "5");

		WaffleImpl<String> waffle2 = new WaffleImpl<String>(map1);
		
		System.out.println(waffle1.update(waffle2, new DirtString<String>()));
	}
	
	public static void testInteger() {
		HashMap<String, Integer> map = new HashMap<String, Integer>();

		map.put("Value1", new Integer(1));
		map.put("Value2", new Integer(1));
		map.put("Value3", new Integer(1));
		map.put("Value4", new Integer(1));
		map.put("Value5", new Integer(1));

		WaffleImpl<Integer> waffle1 = new WaffleImpl<Integer>(map);
	
		HashMap<String, Integer> map1 = new HashMap<String, Integer>();
		map1.put("Value1", new Integer(1));
		map1.put("Value2", new Integer(1));
		map1.put("Value3", new Integer(1));
		map1.put("Value4", new Integer(1));
		map1.put("Value5", new Integer(1));

		WaffleImpl<Integer> waffle2 = new WaffleImpl<Integer>(map1);
		
		System.out.println(waffle1.update(waffle2, new DirtInt<Integer>()));
	}
	
	
	public static void main(String[] args) throws Exception {
		
		testString();
		testInteger();
			
		// TODO tests for file writing
		// String fileName = "waffle.txt";
		//WaffleUtils.writeToFile(waffle, new File(fileName));
		//System.out.println(WaffleUtils.readFromFile(new File(fileName)));
	}
}